1 /*
2  * Collie - An asynchronous event-driven network framework using Dlang development
3  *
4  * Copyright (C) 2015-2017  Shanghai Putao Technology Co., Ltd 
5  *
6  * Developer: putao's Dlang team
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module collie.channel.pipeline;
12 
13 import std.typecons;
14 import std.variant;
15 import std.functional;
16 import std.range.primitives;
17 
18 import collie.channel.handler;
19 import collie.channel.handlercontext;
20 import collie.channel.exception;
21 import collie.net;
22 import kiss.event;
23 
24 interface PipelineManager
25 {
26     void deletePipeline(PipelineBase pipeline);
27     void refreshTimeout();
28 }
29 
30 abstract class PipelineBase
31 {
32     this()
33     {
34     }
35 
36     ~this()
37     {
38     }
39 
40     pragma(inline)
41     @property final void pipelineManager(PipelineManager manager)
42     {
43         _manager = manager;
44     }
45 
46     pragma(inline,true)
47     @property final PipelineManager pipelineManager()
48     {
49         return _manager;
50     }
51 
52     pragma(inline)
53     final void deletePipeline()
54     {
55         if (_manager)
56         {
57             _manager.deletePipeline(this);
58         }
59     }
60 
61     pragma(inline)
62     @property final void transport(Channel transport)
63     {
64         _transport = transport;
65     }
66 
67     pragma(inline,true)
68     @property final Channel transport()
69     {
70         return _transport;
71     }
72 
73     pragma(inline)
74     final PipelineBase addBack(H)(H handler)
75     {
76         return addHelper(new ContextType!(H)(this, handler), false);
77     }
78 
79     pragma(inline)
80     final PipelineBase addFront(H)(H handler)
81     {
82         return addHelper(new ContextType!(H)(this, handler), true);
83     }
84 
85     pragma(inline)
86     final PipelineBase remove(H)(H handler)
87     {
88         return removeHelper!H(handler, true);
89     }
90 
91     pragma(inline)
92     final PipelineBase remove(H)()
93     {
94         return removeHelper!H(null, false);
95     }
96 
97     pragma(inline)
98     final PipelineBase removeFront()
99     {
100         if (_ctxs.empty())
101         {
102 			throw new PipelineEmptyException("No handlers in pipeline");
103         }
104         removeAt(0);
105         return this;
106     }
107 
108     pragma(inline)
109     final PipelineBase removeBack()
110     {
111         if (_ctxs.empty())
112         {
113 			throw new PipelineEmptyException("No handlers in pipeline");
114         }
115         removeAt(_ctxs.length - 1);
116         return this;
117     }
118 
119     pragma(inline)
120     final auto getHandler(H)(int i)
121     {
122         getContext!H(i).handler;
123     }
124 
125     final auto getHandler(H)()
126     {
127         auto ctx = getContext!H();
128         if (ctx)
129             return ctx.handler;
130         return null;
131     }
132 
133     pragma(inline)
134     auto getContext(H)(int i)
135     {
136         auto ctx = cast(ContextType!H)(_ctxs[i]);
137         assert(ctx);
138         return ctx;
139     }
140 
141     auto getContext(H)()
142     {
143         foreach (i; 0 .. _ctxs.length)
144         {
145             auto tctx = _ctxs.at(i);
146             auto ctx = cast(ContextType!H)(tctx);
147             if (ctx)
148                 return ctx;
149         }
150         return null;
151     }
152 
153     void finalize();
154 
155     final void detachHandlers()
156     {
157         foreach (i; 0 .. _ctxs.length)
158         {
159             auto ctx = _ctxs[i];
160             ctx.detachPipeline();
161         }
162     }
163 
164 protected:
165     PipelineContext[] _ctxs;
166     PipelineContext[] _inCtxs;
167     PipelineContext[] _outCtxs;
168 
169     bool _isFinalize = true;
170 private:
171     PipelineManager _manager = null;
172     Channel _transport;
173     //	AsynTransportlogInfo _transportlogInfo;
174 
175     final PipelineBase addHelper(Context)(Context ctx, bool front)
176     {
177         PipelineContext[] addBefore(PipelineContext[] ctxs, Context ctx){
178             auto tctxs = new PipelineContext[ctxs.length + 1];
179             tctxs[0] = ctx;
180             tctxs[1..$] = ctxs[0..$];
181             return tctxs;
182         }
183 
184         PipelineContext[] insertBack(PipelineContext[] ctxs, Context ctx){
185             auto tctxs = new PipelineContext[ctxs.length + 1];
186             tctxs[0..ctxs.length] = ctxs[0..$];
187             tctxs[$ - 1] = ctx;
188             return tctxs;
189         }
190 
191         _isFinalize = false;
192         _ctxs = front ? addBefore(_ctxs, ctx) : insertBack(_ctxs, ctx);
193         if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.IN)
194         {
195             _inCtxs = front ? addBefore(_inCtxs, ctx) : insertBack(_inCtxs, ctx);
196         }
197         if (Context.dir == HandlerDir.BOTH || Context.dir == HandlerDir.OUT)
198         {
199             _outCtxs = front ? addBefore(_outCtxs, ctx) : insertBack(_outCtxs, ctx);
200         }
201         return this;
202     }
203 
204     final PipelineBase removeHelper(H)(H handler, bool checkEqual)
205     {
206         bool removed = false;
207 
208         for (size_t i = 0; i < _ctxs.length; ++i)
209         {
210             auto ctx = cast(ContextType!H) _ctxs[i];
211             if (ctx && (!checkEqual || ctx.getHandler() == handler))
212             {
213                 removeAt(i);
214                 removed = true;
215                 --i;
216                 break;
217             }
218         }
219         if (!removed)
220         {
221 			throw new HandlerNotInPipelineException("No such handler in pipeline");
222         }
223 
224         return *this;
225     }
226 
227     final void removeAt(size_t site)
228     {
229         import kiss.container.array;
230         _isFinalize = false;
231         PipelineContext rctx = _ctxs[site];
232         rctx.detachPipeline();
233         removeSite(_ctxs,site);
234         //_ctxs.removeSite(site);
235 
236         import std.algorithm.searching;
237 
238         const auto dir = rctx.getDirection();
239         if (dir == HandlerDir.BOTH || dir == HandlerDir.IN)
240         {
241             arrayRemove(_inCtxs,rctx,true);
242            // _inCtxs.removeOne(rctx);
243         }
244 
245         if (dir == HandlerDir.BOTH || dir == HandlerDir.OUT)
246         {
247             arrayRemove(_inCtxs,rctx,true);
248             //_outCtxs.removeOne(rctx);
249         }
250     }
251 }
252 
253 /*
254  * R is the inbound type, i.e. inbound calls start with pipeline.read(R)
255  * W is the outbound type, i.e. outbound calls start with pipeline.write(W)
256  *
257  * Use Unit for one of the types if your pipeline is unidirectional.
258  * If R is void, read(),  will be disabled.
259  * If W is Unit, write() and close() will be disabled.
260  */
261 
262 final class Pipeline(R, W = void) : PipelineBase
263 {
264     alias Ptr = Pipeline!(R, W);
265 
266     static Ptr create()
267     {
268         return new Ptr();
269     }
270 
271     ~this()
272     {
273 //        if (!_isStatic)  // USE GC, maybe the contex will free before pipeline
274 //        {
275 //            detachHandlers();
276 //        }
277     }
278 
279     pragma(inline)
280     void read(R msg)
281     {
282         static if (!is(R == void))
283         {
284             if (_front)
285                 _front.read(msg);
286             else
287 				throw new NotHasInBoundException("read(): not have inbound handler in Pipeline");
288         }
289     }
290 
291     pragma(inline,true)
292     void timeOut()
293     {
294         static if (!is(R == void))
295         {
296             if (_front)
297                 _front.timeOut();
298             else
299 				throw new NotHasInBoundException("timeOut(): not have inbound handler in Pipeline");
300         }
301     }
302 
303     pragma(inline)
304     void transportActive()
305     {
306         static if (!is(R == void))
307         {
308             if (_front)
309             {
310                 _front.transportActive();
311             }
312         }
313     }
314 
315     pragma(inline)
316     void transportInactive()
317     {
318         static if (!is(R == void))
319         {
320             if (_front)
321             {
322                 _front.transportInactive();
323             }
324         }
325     }
326 
327     static if (!is(W == void))
328     {
329         alias TheCallBack = void delegate(W, size_t);
330         pragma(inline)
331         void write(W msg, TheCallBack cback = null)
332         {
333 
334             if (_back)
335                 _back.write(msg, cback);
336             else
337 				throw new NotHasOutBoundException("close(): no outbound handler in Pipeline");
338         }
339     }
340 
341     pragma(inline)
342     void close()
343     {
344         static if (!is(W == void))
345         {
346             if (_back)
347                 _back.close();
348             else
349 				throw new NotHasOutBoundException("close(): no outbound handler in Pipeline");
350         }
351     }
352 
353     override void finalize()
354     {
355         if (_isFinalize)
356             return;
357         _front = null;
358         static if (!is(R == void))
359         {
360             if (!_inCtxs.empty())
361             {
362                 _front = cast(InboundLink!R)(_inCtxs[0]);
363                 for (size_t i = 0; i < _inCtxs.length - 1; i++)
364                 {
365                     _inCtxs[i].setNextIn(_inCtxs[i + 1]);
366                 }
367                 _inCtxs[_inCtxs.length - 1].setNextIn(null);
368             }
369         }
370 
371         _back = null;
372         static if (!is(W == void))
373         {
374 
375             if (!_outCtxs.empty())
376             {
377                 _back = cast(OutboundLink!W)(_outCtxs[_outCtxs.length - 1]);
378                 for (size_t i = _outCtxs.length - 1; i > 0; --i)
379                 {
380                     _outCtxs[i].setNextOut(_outCtxs[i - 1]);
381                 }
382                 _outCtxs[0].setNextOut(null);
383             }
384         }
385 
386         for (int i = 0; i < _ctxs.length; ++i)
387         {
388             _ctxs[i].attachPipeline();
389         }
390 
391         if (_front is null && _back is null)
392 			throw new PipelineEmptyException("No Handler in the Pipeline");
393 
394         _isFinalize = true;
395     }
396 
397 protected:
398     this()
399     {
400         super();
401     }
402 
403     this(bool isStatic)
404     {
405         _isStatic = isStatic;
406         super();
407     }
408 
409 private:
410     bool _isStatic = false;
411 
412     static if (!is(R == void))
413     {
414         InboundLink!R _front = null;
415     }
416     else
417     {
418         Object _front = null;
419     }
420 
421     static if (!is(W == void))
422     {
423         OutboundLink!W _back = null;
424     }
425     else
426     {
427         Object _back = null;
428     }
429 }
430 
431 abstract shared class PipelineFactory(PipeLine)
432 {
433     PipeLine newPipeline(TcpStream transport);
434 }
435 
436 alias AcceptPipeline = Pipeline!(TcpStream, uint);
437 abstract shared class AcceptPipelineFactory
438 {
439     AcceptPipeline newPipeline(TcpListener acceptor);
440 }